{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<!--\n",
    "    Licensed to the Apache Software Foundation (ASF) under one\n",
    "    or more contributor license agreements.  See the NOTICE file\n",
    "    distributed with this work for additional information\n",
    "    regarding copyright ownership.  The ASF licenses this file\n",
    "    to you under the Apache License, Version 2.0 (the\n",
    "    \"License\"); you may not use this file except in compliance\n",
    "    with the License.  You may obtain a copy of the License at\n",
    "\n",
    "      http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    "    Unless required by applicable law or agreed to in writing,\n",
    "    software distributed under the License is distributed on an\n",
    "    \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
    "    KIND, either express or implied.  See the License for the\n",
    "    specific language governing permissions and limitations\n",
    "    under the License.\n",
    "-->\n",
    "\n",
    "# Interactive Beam Running on Flink"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import apache_beam as beam\n",
    "from apache_beam.runners.interactive import interactive_runner\n",
    "from apache_beam.runners.portability import portable_runner\n",
    "from apache_beam.options import pipeline_options"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "options = pipeline_options.PipelineOptions()\n",
    "options.view_as(pipeline_options.PortableOptions).job_endpoint = 'localhost:8099'\n",
    "options.view_as(pipeline_options.SetupOptions).sdk_location = 'container'\n",
    "options.view_as(pipeline_options.DebugOptions).experiments = 'beam_fn_api'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "cache_dir = \"gs://bucket-name/path/to/dir\"\n",
    "underlying_runner = portable_runner.PortableRunner()\n",
    "runner = interactive_runner.InteractiveRunner(underlying_runner=underlying_runner, cache_dir=cache_dir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
       " -->\n",
       "<!-- Title: G Pages: 1 -->\n",
       "<svg width=\"194pt\" height=\"218pt\"\n",
       " viewBox=\"0.00 0.00 193.60 218.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 214)\">\n",
       "<title>G</title>\n",
       "<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-214 189.596,-214 189.596,4 -4,4\"/>\n",
       "<!-- leaf7581 -->\n",
       "<!-- Create -->\n",
       "<g id=\"node2\" class=\"node\"><title>Create</title>\n",
       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"96.5963\" cy=\"-192\" rx=\"33.5952\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"96.5963\" y=\"-188.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Create</text>\n",
       "</g>\n",
       "<!-- Square -->\n",
       "<g id=\"node3\" class=\"node\"><title>Square</title>\n",
       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"50.5963\" cy=\"-105\" rx=\"35.194\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"50.5963\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Square</text>\n",
       "</g>\n",
       "<!-- Create&#45;&gt;Square -->\n",
       "<g id=\"edge4\" class=\"edge\"><title>Create&#45;&gt;Square</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M83.5132,-175.21C79.0649,-169.393 74.2897,-162.607 70.5963,-156 66.4908,-148.655 62.8,-140.295 59.7398,-132.541\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"63.0108,-131.296 56.2171,-123.166 56.4581,-133.758 63.0108,-131.296\"/>\n",
       "<g id=\"a_edge4&#45;label\"><a xlink:title=\"{1, 3, 5, 0, 6, 7, 4, 8, 2, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"90.5963\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">{1, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- Cube -->\n",
       "<g id=\"node5\" class=\"node\"><title>Cube</title>\n",
       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"137.596\" cy=\"-105\" rx=\"29.4969\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"137.596\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Cube</text>\n",
       "</g>\n",
       "<!-- Create&#45;&gt;Cube -->\n",
       "<g id=\"edge1\" class=\"edge\"><title>Create&#45;&gt;Cube</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M104.697,-174.207C110.575,-162.021 118.631,-145.318 125.301,-131.491\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"128.46,-132.997 129.652,-122.469 122.155,-129.956 128.46,-132.997\"/>\n",
       "<g id=\"a_edge1&#45;label\"><a xlink:title=\"{1, 3, 5, 0, 6, 7, 4, 8, 2, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"138.596\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">{1, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- leaf7582 -->\n",
       "<!-- Square&#45;&gt;leaf7582 -->\n",
       "<g id=\"edge2\" class=\"edge\"><title>Square&#45;&gt;leaf7582</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M48.775,-86.799C47.5429,-75.1626 45.8896,-59.5479 44.4802,-46.2368\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"47.9485,-45.7513 43.4149,-36.1754 40.9874,-46.4884 47.9485,-45.7513\"/>\n",
       "<g id=\"a_edge2&#45;label\"><a xlink:title=\"{16, 25, 64, 36, 81, 49, 4, 0, 1, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"69.5963\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">{16, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- Cube&#45;&gt;leaf7581 -->\n",
       "<g id=\"edge3\" class=\"edge\"><title>Cube&#45;&gt;leaf7581</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M138.608,-86.799C139.293,-75.1626 140.211,-59.5479 140.994,-46.2368\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"144.493,-46.3637 141.586,-36.1754 137.505,-45.9526 144.493,-46.3637\"/>\n",
       "<g id=\"a_edge3&#45;label\"><a xlink:title=\"{27, 512, 216, 125, 729, 64, 343, 1, 8, 0}\">\n",
       "<text text-anchor=\"middle\" x=\"162.596\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">{27, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Running..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Using 0 cached PCollections\n",
       "Executing 8 of 3 transforms."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Cube produced {27, 512, 216, 125, 729, ...}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Square produced {16, 25, 64, 36, 81, ...}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Create produced {1, 3, 5, 0, 6, ...}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "p = beam.Pipeline(runner=runner, options=options)\n",
    "init_pcoll = p |  beam.Create(range(10))\n",
    "squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)\n",
    "cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)\n",
    "result = p.run()\n",
    "result.wait_until_finish()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "class AverageFn(beam.CombineFn):\n",
    "  def create_accumulator(self):\n",
    "    return (0.0, 0)\n",
    "\n",
    "  def add_input(self, sum_count, input):\n",
    "    (sum, count) = sum_count\n",
    "    return sum + input, count + 1\n",
    "\n",
    "  def merge_accumulators(self, accumulators):\n",
    "    sums, counts = zip(*accumulators)\n",
    "    return sum(sums), sum(counts)\n",
    "\n",
    "  def extract_output(self, sum_count):\n",
    "    (sum, count) = sum_count\n",
    "    return sum / count if count else float('NaN')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
       " -->\n",
       "<!-- Title: G Pages: 1 -->\n",
       "<svg width=\"284pt\" height=\"305pt\"\n",
       " viewBox=\"0.00 0.00 284.34 305.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 301)\">\n",
       "<title>G</title>\n",
       "<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-301 280.339,-301 280.339,4 -4,4\"/>\n",
       "<!-- Square -->\n",
       "<g id=\"node1\" class=\"node\"><title>Square</title>\n",
       "<ellipse fill=\"none\" stroke=\"grey\" cx=\"107.594\" cy=\"-192\" rx=\"35.194\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"107.594\" y=\"-188.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"gray\">Square</text>\n",
       "</g>\n",
       "<!-- Average Square -->\n",
       "<g id=\"node3\" class=\"node\"><title>Average Square</title>\n",
       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"67.594\" cy=\"-105\" rx=\"67.6881\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"67.594\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Square</text>\n",
       "</g>\n",
       "<!-- Square&#45;&gt;Average Square -->\n",
       "<g id=\"edge1\" class=\"edge\"><title>Square&#45;&gt;Average Square</title>\n",
       "<path fill=\"none\" stroke=\"red\" d=\"M99.6913,-174.207C94.085,-162.293 86.4474,-146.063 80.0289,-132.424\"/>\n",
       "<polygon fill=\"red\" stroke=\"red\" points=\"83.0421,-130.607 75.6172,-123.049 76.7084,-133.588 83.0421,-130.607\"/>\n",
       "<g id=\"a_edge1&#45;label\"><a xlink:title=\"{16, 25, 64, 36, 81, 49, 4, 0, 1, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"113.594\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">{16, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- Create -->\n",
       "<g id=\"node2\" class=\"node\"><title>Create</title>\n",
       "<ellipse fill=\"none\" stroke=\"grey\" cx=\"152.594\" cy=\"-279\" rx=\"33.5952\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"152.594\" y=\"-275.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"gray\">Create</text>\n",
       "</g>\n",
       "<!-- Create&#45;&gt;Square -->\n",
       "<g id=\"edge3\" class=\"edge\"><title>Create&#45;&gt;Square</title>\n",
       "<path fill=\"none\" stroke=\"grey\" d=\"M139.447,-262.245C134.993,-256.432 130.23,-249.639 126.594,-243 122.624,-235.751 119.118,-227.499 116.233,-219.818\"/>\n",
       "<polygon fill=\"grey\" stroke=\"grey\" points=\"119.456,-218.434 112.812,-210.181 112.859,-220.776 119.456,-218.434\"/>\n",
       "<g id=\"a_edge3&#45;label\"><a xlink:title=\"{1, 3, 5, 0, 6, 7, 4, 8, 2, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"146.594\" y=\"-231.8\" font-family=\"Times,serif\" font-size=\"14.00\">{1, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- Cube -->\n",
       "<g id=\"node6\" class=\"node\"><title>Cube</title>\n",
       "<ellipse fill=\"none\" stroke=\"grey\" cx=\"193.594\" cy=\"-192\" rx=\"29.4969\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"193.594\" y=\"-188.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"gray\">Cube</text>\n",
       "</g>\n",
       "<!-- Create&#45;&gt;Cube -->\n",
       "<g id=\"edge6\" class=\"edge\"><title>Create&#45;&gt;Cube</title>\n",
       "<path fill=\"none\" stroke=\"grey\" d=\"M160.694,-261.207C166.572,-249.021 174.629,-232.318 181.298,-218.491\"/>\n",
       "<polygon fill=\"grey\" stroke=\"grey\" points=\"184.458,-219.997 185.65,-209.469 178.153,-216.956 184.458,-219.997\"/>\n",
       "<g id=\"a_edge6&#45;label\"><a xlink:title=\"{1, 3, 5, 0, 6, 7, 4, 8, 2, 9}\">\n",
       "<text text-anchor=\"middle\" x=\"194.594\" y=\"-231.8\" font-family=\"Times,serif\" font-size=\"14.00\">{1, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- leaf7567 -->\n",
       "<!-- Average Square&#45;&gt;leaf7567 -->\n",
       "<g id=\"edge2\" class=\"edge\"><title>Average Square&#45;&gt;leaf7567</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M67.594,-86.799C67.594,-75.1626 67.594,-59.5479 67.594,-46.2368\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"71.0941,-46.1754 67.594,-36.1754 64.0941,-46.1755 71.0941,-46.1754\"/>\n",
       "<g id=\"a_edge2&#45;label\"><a xlink:title=\"{28.5}\">\n",
       "<text text-anchor=\"middle\" x=\"86.594\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">{28.5}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- leaf7570 -->\n",
       "<!-- Average Cube -->\n",
       "<g id=\"node7\" class=\"node\"><title>Average Cube</title>\n",
       "<ellipse fill=\"none\" stroke=\"blue\" cx=\"214.594\" cy=\"-105\" rx=\"61.99\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"214.594\" y=\"-101.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"blue\">Average Cube</text>\n",
       "</g>\n",
       "<!-- Cube&#45;&gt;Average Cube -->\n",
       "<g id=\"edge4\" class=\"edge\"><title>Cube&#45;&gt;Average Cube</title>\n",
       "<path fill=\"none\" stroke=\"red\" d=\"M197.844,-173.799C200.719,-162.163 204.576,-146.548 207.865,-133.237\"/>\n",
       "<polygon fill=\"red\" stroke=\"red\" points=\"211.35,-133.723 210.351,-123.175 204.554,-132.044 211.35,-133.723\"/>\n",
       "<g id=\"a_edge4&#45;label\"><a xlink:title=\"{27, 512, 216, 125, 729, 64, 343, 1, 8, 0}\">\n",
       "<text text-anchor=\"middle\" x=\"227.594\" y=\"-144.8\" font-family=\"Times,serif\" font-size=\"14.00\">{27, ...}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "<!-- Average Cube&#45;&gt;leaf7570 -->\n",
       "<g id=\"edge5\" class=\"edge\"><title>Average Cube&#45;&gt;leaf7570</title>\n",
       "<path fill=\"none\" stroke=\"black\" d=\"M214.594,-86.799C214.594,-75.1626 214.594,-59.5479 214.594,-46.2368\"/>\n",
       "<polygon fill=\"black\" stroke=\"black\" points=\"218.094,-46.1754 214.594,-36.1754 211.094,-46.1755 218.094,-46.1754\"/>\n",
       "<g id=\"a_edge5&#45;label\"><a xlink:title=\"{202.5}\">\n",
       "<text text-anchor=\"middle\" x=\"237.094\" y=\"-57.8\" font-family=\"Times,serif\" font-size=\"14.00\">{202.5}</text>\n",
       "</a>\n",
       "</g>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Running..."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Using 2 cached PCollections\n",
       "Executing 8 of 5 transforms."
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Average Cube produced {202.5}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Cube produced {27, 512, 216, 125, 729, ...}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Square produced {16, 25, 64, 36, 81, ...}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "text/plain": [
       "Average Square produced {28.5}"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "average_square = squares | 'Average Square' >> beam.CombineGlobally(AverageFn())\n",
    "average_cube = cubes | 'Average Cube' >> beam.CombineGlobally(AverageFn())\n",
    "result = p.run()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python (beam_venv)",
   "language": "python",
   "name": "beam_venv_kernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
